package com.alinesno.cloud.compoment.kafka.config;

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

/**
 * kafka 发送配置
 * @author LuoAnDong
 * @since 2019年4月9日 上午11:15:09
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {

	private static final Logger log = LoggerFactory.getLogger(KafkaProducerConfig.class) ; 
	
	@Autowired
	private KafkaProperties KafkaProperties ; 
	
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = KafkaProperties.buildProducerProperties() ; 
        
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
    	log.debug("初始化客户端kafka配置:{}" , KafkaProperties);
    	
        return this.producerConfigs(props);
    }
    
    @Bean
    public Map<String, Object> producerConfigs(Map<String, Object> props) {
        return props;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}